#!/usr/bin/env python2
# coding:utf-8

import os
import sys
import socket
import time
import httplib
import urllib
import subprocess
import fcntl
import types
import errno
import getopt
import random
import threading
import Queue

from wsgiref.simple_server import make_server

from daemon import Daemon

def _exec_shell(cmd, retry = 3, p = True):
    if (p):
        print cmd
        #_dmsg (cmd)
    _retry = 0
    while (1):
        try:
            p = subprocess.Popen(cmd, shell=True)
            stdout, stderr = p.communicate()
            ret = p.returncode
            if (ret == 0):
                return
            elif (ret == errno.EAGAIN and _retry < retry):
                _retry = _retry + 1
                time.sleep(1)
                continue
            else:
                #_derror("cmd " + cmd + " : " + os.strerror(ret))
                #exit(1)
                raise Exception("%s: %s" % (cmd, os.strerror(ret)))
        except KeyboardInterrupt as err:
            #_dwarn("interupted")
            p.kill()
            exit(errno.EINTR)

def ping_ok(host):
    cmd = 'ping %s -c 3 -W 1' % (host)
    try:
        _exec_shell(cmd)
        return True
    except Exception, err:
        return False

class Lich_check_suspend_admin(Daemon):
    def __init__(self, admin_port=None, home=None):
        self.role = "lich_check_suspend_admin"
        self.admin_port = admin_port
        self.agent_port = 0
        self.home = "/opt/mds/"
        if home is not None:
            self.home = home

        self.hosts = []
        self.start_time = None

        #self.hb_timeouts = [['time', 'src', 'dist'], ['time', 'src', 'dist'], ...]
        self.hb_timeouts = []

        self.connection = None
        self.threads = []
        #queue 被用来做为锁
        self.queue = Queue.Queue(1)
        self.queue.put(1, block=False)

        self.netblock = None

        os.system('mkdir -p ' + self.home + '/tmp')
        os.system('mkdir -p ' + self.home + '/log')
        pidfile = '/var/run/%s.pid' % (self.role)
        log = self.home + '/log/%s.log' % (self.role)
        os.system('touch ' + log)
        super(Lich_check_suspend_admin, self).__init__(pidfile, '/dev/null', log, log, self.role)

    def __del_older_timeout(self):
        now = int(time.time())
        while True:
            if not self.hb_timeouts:
                break

            t = self.hb_timeouts[0][0]
            if (now - t > 30) or (now - t < 0):
                print 'del older timout: %s, now: %s' % (self.hb_timeouts[0], now)
                del self.hb_timeouts[0]
            else:
                break

    def _del_older_timeout(self):
        self.lock()
        try:
            self.__del_older_timeout()
        finally:
            self.unlock()

    def __del_offline_timeout(self, offline_host):
        timeouts = []
        for x in self.hb_timeouts:
            src_host = x[-2].split("/")[0]
            dist_host = x[-1].split("/")[0]
            if dist_host == offline_host:
                print 'del offline %s timeout %s ' % (offline_host, x)
            else:
                timeouts.append(x)

        self.hb_timeouts = timeouts[:]

    def _del_offline_timeout(self, offline_host):
        self.lock()
        try:
            self.__del_offline_timeout(offline_host)
        finally:
            self.unlock()

    def __filter_timeout_byhost(self):
        timeouts = []
        for x in self.hb_timeouts:
            src_host = x[-2].split("/")[0]
            dist_host = x[-1].split("/")[0]
            t = [src_host, dist_host]
            if t not in timeouts:
                timeouts.append(t)

        return timeouts

    def _filter_timeout_byhost(self):
        self.lock()
        try:
            return self.__filter_timeout_byhost()
        finally:
            self.unlock()

    def _get_votes(self, timeouts):
        votes_dist = {}
        for x in timeouts:
            src = x[-2]
            dist = x[-1]
            votes_dist[dist] = votes_dist.get(dist, 0) + 1
        return votes_dist

    def _get_max_vote(self, votes):
        max_dist = votes.keys()[0]
        for d in votes.keys():
            if votes.get(d) > votes.get(max_dist):
                max_dist = d

        return max_dist

    def __get_netblock(self, timeouts):
        netblock = None

        if not timeouts:
            return None

        if not self.hosts:
            print("WARN self.hosts was null")
            return None

        #获取可疑的假死节点
        votes_dist = self._get_votes(timeouts)
        netblock_maybe = self._get_max_vote(votes_dist)

        #删除来自可疑假死节点的超时 timeout = [time, from, to]
        timeouts_new = [x for x in timeouts if x[-2] != netblock_maybe]
        votes_dist = self._get_votes(timeouts_new)

        if len(votes_dist.keys()) > 1:
            #more than one host was hb_timeout, so i am confused 
            print("WARN more than one timeout, votes_dist: %s, hosts: %s, self.timeouts: %s, now: %s" % (
                    votes_dist, self.hosts, self.hb_timeouts, time.time()))
            return None
        elif len(votes_dist.keys()) == 0:
            print("WARN no timeout,  votes_dist: %s, hosts: %s, self.timeouts: %s, now: %s" % (
                    votes_dist, self.hosts, self.hb_timeouts, time.time()))
            return None
        else:
            max_dist = self._get_max_vote(votes_dist)
            vote = votes_dist[max_dist]
            if vote >= (len(self.hosts)/2 + 1):
                netblock = max_dist 

            return netblock

    def _get_netblock(self, timeouts):
        self.lock()
        try:
            return self.__get_netblock(timeouts)
        finally:
            self.unlock()

    def lock(self):
        self.queue.get(block=True, timeout=3)

    def unlock(self):
        self.queue.put(1, block=False)

    def get_netblock(self):
        if self.netblock:
            #一次只处理一个假死节点(一个黑名单)
            print("WARN has netblock: %s, just send" % (self.netblock))
            return self.netblock

        self._del_older_timeout()
        timeouts = self._filter_timeout_byhost()
        netblock = self._get_netblock(timeouts)
        if netblock:
            if ping_ok(netblock):
                self.netblock = netblock
                return netblock
            else:
                self._del_offline_timeout(netblock)

        return None

    def send_netblock(self, netblock):
        #http send
        for x in self.hosts:
            try:
                params = urllib.urlencode({
                    'netblock': netblock,
                    })
                headers = {"Content-type": "application/x-www-form-urlencoded"}
                conn = httplib.HTTPConnection(x, self.agent_port)
                conn.request("POST", '/netblock?netblock='+netblock, params, headers)
                response = conn.getresponse()
                data = response.read()
                conn.close()
            except Exception, e:
                print 'send netblock exp:', e, 'host: ', x
            finally:
                if conn:
                    conn.close()

    def text2dict(self, body):
        r = dict()
        for key_value in body.strip().split('\n'):
            key = key_value.split(':')[0]
            value = key_value.split(':')[1]
            r.update({key: value})
        return r

    def get_delay(self, send):
        if send > 120:
            return 60
        return 1

    def _start_check_suspend(self):
        netblock = None
        delay = 1
        send = 0
        while True:
            netblock = self.get_netblock()
            if netblock:
                print("get netblock %s, self.hosts: %s, self.timeouts: %s, now: %s" % (
                        netblock, self.hosts, self.hb_timeouts, time.time()))
                self.send_netblock(netblock)
                delay = self.get_delay(send)
                send = send + 1

            time.sleep(delay)

    def _start_httpserver(self):
        #path = /hb_timeout?from='xxxx'&to="xxxx"&hosts=""&agent_port='7070'
        #path = /cluster_list?hosts='node5:node4'&agent_port='7070'
        def get_hb_timeout(environ, start_response):
            hb_timeout = None
            query = environ.get("QUERY_STRING", None)
            querys = str2dict(query)
            f = querys.get("from", None).strip()
            t = querys.get("to", None).strip()
            self.agent_port = querys.get('port', None)
            hosts = querys.get('hosts', None)

            if hosts:
                hosts_new = hosts.split(";")
                if set(self.hosts) ^ set(hosts_new):
                    print("reset hosts %s to %s" % (self.hosts, hosts_new))
                    self.hosts = hosts_new[:]

            if f and t:
                print("recive hb timeout: from: %s, to: %s" % (f, t))
                t = [int(time.time()), f, t]
                self.hb_timeouts.append(t)
            else:
                print("WARN error querys: %s, now: %s" % (
                    query, time.time()))
            return 'OK'

        def application(environ, start_response):
            path = environ.get('PATH_INFO')
            if path == '/hb_timeout':
                self.lock()
                try:
                    response_body = get_hb_timeout(environ, start_response)
                finally:
                    self.unlock()
            else:
                response_body = "unsupport, just support hb_timeout"

            #print("http path %s, environ: %s" % (path, environ))
            status = "200 OK"
            response_headers = [("Content-Length", str(len(response_body)))]
            start_response(status, response_headers)
            return [response_body]
        try:
            httpd = make_server('0.0.0.0', int(self.admin_port), application)
            httpd.serve_forever()
        except Exception, e:
            print str(e),e.errno
            raise Exception(e.errno, e.strerror)

    def wait_finish(self):
        while True:
            fails = []
            for t in self.threads:
                if not t.is_alive():
                    fails.append(t)

            if fails:
                error = "%s was fail" % (",".join([x.getName() for x in fails]))
                raise Exception(error)

            time.sleep(3)

    def start_check_suspend(self):
        t = threading.Thread(target=self._start_check_suspend, args=())
        self.threads.append(t)
        t.setName("start_check_suspend")
        t.setDaemon(True)
        t.start()

    def start_httpserver(self):
        t = threading.Thread(target=self._start_httpserver, args=())
        self.threads.append(t)
        t.setName("start_httpserver")
        t.setDaemon(True)
        t.start()

    def init(self):
        self.start_time = int(time.time())

    def run(self):
        self.init()
        self.start_httpserver()
        self.start_check_suspend()
        self.wait_finish()

def str2dict(s):
    if s is None:
        return {}

    r = {}
    for x in s.split("&"):
        if len(x.split("=")) < 2:
            print("WARN query string was format error: %s" % s)
            continue

        (k, v) = x.split("=")
        r.update({k: v})

    return r

def usage():
    print ("usage:")
    print (sys.argv[0] + " --start --admin_port <port>")
    print (sys.argv[0] + " --stop")
    print (sys.argv[0] + " --test --admin_port <port>")
    exit(1)

def main():
    op = ''
    ext = None
    admin_port = None
    try:
        opts, args = getopt.getopt(
                sys.argv[1:], 
                'h', ['start', 'stop', 'help', 'test', "admin_port="]
                )
    except getopt.GetoptError, err:
        print str(err)
        usage()

    for o, a in opts:
        if o in ('--help'):
            usage()
            exit(0)
        elif o == '--start':
            op = o
        elif o == '--stop':
            op = o
        elif o == '--test':
            op = o
        elif o == '--admin_port':
            admin_port = a
        else:
            assert False, 'oops, unhandled option: %s, -h for help' % o
            exit(1)

    lich_check = Lich_check_suspend_admin(admin_port)
    if (op == '--start'):
        lich_check.start()
    elif (op == '--stop'):
        lich_check.stop()
    elif (op == '--test'):
        lich_check.run()
    else:
        assert False, 'oops, unhandled option: %s, -h for help' % o
        exit(1)

if __name__ == '__main__':
    if (len(sys.argv) == 1):
        usage()
    else:
        main()
